{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "00bc6543",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql import types"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "cd4a0f3d",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARNING: An illegal reflective access operation has occurred\n",
      "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
      "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
      "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
      "WARNING: All illegal access operations will be denied in a future release\n",
      "22/03/07 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
     ]
    }
   ],
   "source": [
    "spark = SparkSession.builder \\\n",
    "    .master(\"local[*]\") \\\n",
    "    .appName('test') \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "eb3e4c36",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'3.0.3'"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "5236cebd",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-rw-rw-r-- 1 alexey alexey 700M Oct 29 18:53 fhvhv_tripdata_2021-02.csv\r\n"
     ]
    }
   ],
   "source": [
    "!ls -lh fhvhv_tripdata_2021-02.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "0a3399a3",
   "metadata": {},
   "outputs": [],
   "source": [
    "schema = types.StructType([\n",
    "    types.StructField('hvfhs_license_num', types.StringType(), True),\n",
    "    types.StructField('dispatching_base_num', types.StringType(), True),\n",
    "    types.StructField('pickup_datetime', types.TimestampType(), True),\n",
    "    types.StructField('dropoff_datetime', types.TimestampType(), True),\n",
    "    types.StructField('PULocationID', types.IntegerType(), True),\n",
    "    types.StructField('DOLocationID', types.IntegerType(), True),\n",
    "    types.StructField('SR_Flag', types.StringType(), True)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "68bc8b72",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read \\\n",
    "    .option(\"header\", \"true\") \\\n",
    "    .schema(schema) \\\n",
    "    .csv('fhvhv_tripdata_2021-02.csv')\n",
    "\n",
    "df = df.repartition(24)\n",
    "\n",
    "df.write.parquet('data/pq/fhvhv/2021/02/', compression=)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "58989b55",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 0:>                                                          (0 + 1) / 1]\r",
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df = spark.read.parquet('data/pq/fhvhv/2021/02/')"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "48b01d2f",
   "metadata": {},
   "source": [
    "**Q3**: How many taxi trips were there on February 15?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "f7489aea",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import functions as F"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "6c2500fd",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "data": {
      "text/plain": [
       "367170"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df \\\n",
    "    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
    "    .filter(\"pickup_date = '2021-02-15'\") \\\n",
    "    .count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "dd7ae60d",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.registerTempTable('fhvhv_2021_02')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "6d47c147",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 20:>                                                         (0 + 4) / 4]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|count(1)|\n",
      "+--------+\n",
      "|  367170|\n",
      "+--------+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 20:==============>                                           (1 + 3) / 4]\r",
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "spark.sql(\"\"\"\n",
    "SELECT\n",
    "    COUNT(1)\n",
    "FROM \n",
    "    fhvhv_2021_02\n",
    "WHERE\n",
    "    to_date(pickup_datetime) = '2021-02-15';\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ae3f533b",
   "metadata": {},
   "source": [
    "**Q4**: Longest trip for each day"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "7befe422",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['hvfhs_license_num',\n",
       " 'dispatching_base_num',\n",
       " 'pickup_datetime',\n",
       " 'dropoff_datetime',\n",
       " 'PULocationID',\n",
       " 'DOLocationID',\n",
       " 'SR_Flag']"
      ]
     },
     "execution_count": 29,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "id": "279d9161",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 37:==============>                                           (1 + 3) / 4]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------+-------------+\n",
      "|pickup_date|max(duration)|\n",
      "+-----------+-------------+\n",
      "| 2021-02-11|        75540|\n",
      "| 2021-02-17|        57221|\n",
      "| 2021-02-20|        44039|\n",
      "| 2021-02-03|        40653|\n",
      "| 2021-02-19|        37577|\n",
      "+-----------+-------------+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 38:==================================================>   (187 + 4) / 200]\r",
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df \\\n",
    "    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \\\n",
    "    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
    "    .groupBy('pickup_date') \\\n",
    "        .max('duration') \\\n",
    "    .orderBy('max(duration)', ascending=False) \\\n",
    "    .limit(5) \\\n",
    "    .show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "id": "74cf0e8b",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 43:>                                                         (0 + 4) / 4]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------+-----------------+\n",
      "|pickup_date|         duration|\n",
      "+-----------+-----------------+\n",
      "| 2021-02-11|           1259.0|\n",
      "| 2021-02-17|953.6833333333333|\n",
      "| 2021-02-20|733.9833333333333|\n",
      "| 2021-02-03|           677.55|\n",
      "| 2021-02-19|626.2833333333333|\n",
      "| 2021-02-25|            583.5|\n",
      "| 2021-02-18|576.8666666666667|\n",
      "| 2021-02-10|569.4833333333333|\n",
      "| 2021-02-21|           537.05|\n",
      "| 2021-02-09|534.7833333333333|\n",
      "+-----------+-----------------+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 44:================================================>     (180 + 4) / 200]\r",
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "spark.sql(\"\"\"\n",
    "SELECT\n",
    "    to_date(pickup_datetime) AS pickup_date,\n",
    "    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration\n",
    "FROM \n",
    "    fhvhv_2021_02\n",
    "GROUP BY\n",
    "    1\n",
    "ORDER BY\n",
    "    2 DESC\n",
    "LIMIT 10;\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d915096b",
   "metadata": {},
   "source": [
    "**Q5**: Most frequent `dispatching_base_num`\n",
    "\n",
    "How many stages this spark job has?\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "id": "25816aa2",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 73:>                                                         (0 + 4) / 4]\r",
      "\r",
      "[Stage 73:==============>                                           (1 + 3) / 4]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------+\n",
      "|dispatching_base_num|count(1)|\n",
      "+--------------------+--------+\n",
      "|              B02510| 3233664|\n",
      "|              B02764|  965568|\n",
      "|              B02872|  882689|\n",
      "|              B02875|  685390|\n",
      "|              B02765|  559768|\n",
      "+--------------------+--------+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 74:===================================================>  (189 + 5) / 200]\r",
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "spark.sql(\"\"\"\n",
    "SELECT\n",
    "    dispatching_base_num,\n",
    "    COUNT(1)\n",
    "FROM \n",
    "    fhvhv_2021_02\n",
    "GROUP BY\n",
    "    1\n",
    "ORDER BY\n",
    "    2 DESC\n",
    "LIMIT 5;\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "id": "a78f9fe3",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 86:>                                                         (0 + 4) / 4]\r",
      "\r",
      "[Stage 86:=============================>                            (2 + 2) / 4]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------+\n",
      "|dispatching_base_num|  count|\n",
      "+--------------------+-------+\n",
      "|              B02510|3233664|\n",
      "|              B02764| 965568|\n",
      "|              B02872| 882689|\n",
      "|              B02875| 685390|\n",
      "|              B02765| 559768|\n",
      "+--------------------+-------+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "[Stage 87:===========================================>          (161 + 5) / 200]\r",
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df \\\n",
    "    .groupBy('dispatching_base_num') \\\n",
    "        .count() \\\n",
    "    .orderBy('count', ascending=False) \\\n",
    "    .limit(5) \\\n",
    "    .show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0d10173a",
   "metadata": {},
   "source": [
    "**Q6**: Most common locations pair"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "74b7f664",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_zones = spark.read.parquet('zones')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "id": "81642d3b",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['LocationID', 'Borough', 'Zone', 'service_zone']"
      ]
     },
     "execution_count": 49,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_zones.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "id": "4f460dda",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['hvfhs_license_num',\n",
       " 'dispatching_base_num',\n",
       " 'pickup_datetime',\n",
       " 'dropoff_datetime',\n",
       " 'PULocationID',\n",
       " 'DOLocationID',\n",
       " 'SR_Flag']"
      ]
     },
     "execution_count": 51,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "id": "ad8f0101",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_zones.registerTempTable('zones')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "id": "6f738414",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 103:==============================================>      (176 + 4) / 200]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------+\n",
      "|          pu_do_pair|count(1)|\n",
      "+--------------------+--------+\n",
      "|East New York / E...|   45041|\n",
      "|Borough Park / Bo...|   37329|\n",
      "| Canarsie / Canarsie|   28026|\n",
      "|Crown Heights Nor...|   25976|\n",
      "|Bay Ridge / Bay R...|   17934|\n",
      "+--------------------+--------+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\r",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "spark.sql(\"\"\"\n",
    "SELECT\n",
    "    CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,\n",
    "    COUNT(1)\n",
    "FROM \n",
    "    fhvhv_2021_02 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID\n",
    "                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID\n",
    "GROUP BY \n",
    "    1\n",
    "ORDER BY\n",
    "    2 DESC\n",
    "LIMIT 5;\n",
    "\"\"\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e4b754d1",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
